Skip to content

RocketMQ提供了哪些消息过滤机制

RocketMQ提供了多种消息过滤机制,以便消费者能够根据业务需求进行精确的消息消费。主要的过滤机制包括:

  1. Tag过滤:最常用的过滤方式,消费者可以基于消息的Tag进行过滤。
  2. SQL92语法过滤:基于Message属性,以SQL92标准语法进行复杂条件过滤,该功能需要Broker支持。

1. Tag过滤

Tag过滤是RocketMQ最基本的过滤机制,性能最好,因为它是在服务器端完成的。

  • 每条消息可以有一个或多个Tag。
  • 消费者订阅时指定需接收的Tag。

代码示例

生产者发送带Tag的消息:

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;  
import org.apache.rocketmq.common.message.Message;  

public class ProducerWithTagExample {  
    public static void main(String[] args) throws Exception {  
        // 创建一个生产者实例  
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  
        producer.setNamesrvAddr("localhost:9876");  
        producer.start();  

        // 发送消息,设置不同的Tags  
        String[] tags = {"TagA", "TagB", "TagC"};  
        for (int i = 0; i < 10; i++) {  
            String tag = tags[i % tags.length];  
            Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes());  
            producer.send(msg);  
        }  

        producer.shutdown();  
    }  
}

消费者只接收某些Tag的消息:

java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;  
import org.apache.rocketmq.common.message.MessageExt;  

import java.util.List;  

public class ConsumerWithTagExample {  
    public static void main(String[] args) throws Exception {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");  
        consumer.setNamesrvAddr("localhost:9876");  

        // 订阅包含TagA和TagB的消息  
        consumer.subscribe("TopicTest", "TagA || TagB");  

        consumer.registerMessageListener(new MessageListenerConcurrently() {  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                for (MessageExt msg : msgs) {  
                    System.out.printf("Received message: %s, Tag: %s%n", new String(msg.getBody()), msg.getTags());  
                }  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  

        consumer.start();  
        System.out.println("Consumer started.");  
    }  
}

2. SQL92语法过滤

SQL92过滤基于用户设置的消息属性,支持更复杂的过滤条件。这种过滤机制是由服务器端进行的,需要预先配置Broker支持属性过滤。

使用方法

  1. 启动Broker时启用过滤:启动Broker时,需要配置enablePropertyFilter=true来启用SQL92语法过滤功能。
  2. 生产者设置消息属性
java
import org.apache.rocketmq.client.producer.DefaultMQProducer;  
import org.apache.rocketmq.common.message.Message;  

public class ProducerWithPropertyExample {  
    public static void main(String[] args) throws Exception {  
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  
        producer.setNamesrvAddr("localhost:9876");  
        producer.start();  

        for (int i = 0; i < 10; i++) {  
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());  
            msg.putUserProperty("key", String.valueOf(i));  
            producer.send(msg);  
        }  

        producer.shutdown();  
    }  
}
  1. 消费者使用SQL92语法过滤
java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;  
import org.apache.rocketmq.common.message.MessageExt;  

import java.util.List;  

public class ConsumerWithSQL92Example {  
    public static void main(String[] args) throws Exception {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");  
        consumer.setNamesrvAddr("localhost:9876");  

        // 使用SQL92条件过滤消息  
        consumer.subscribe("TopicTest", MessageSelector.bySql("key between 0 and 5"));  

        consumer.registerMessageListener(new MessageListenerConcurrently() {  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                for (MessageExt msg : msgs) {  
                    System.out.printf("Received message: %s, Property key: %s%n", new String(msg.getBody()), msg.getUserProperty("key"));  
                }  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  

        consumer.start();  
        System.out.println("Consumer started.");  
    }  
}

总结

  • Tag过滤:简单高效,适合多订阅者需接受不同子类型消息的场景。
  • SQL92过滤:灵活强大,适合需要复杂条件过滤的场景,但是需要对性能加以考虑,并且需要在Broker端配置以支持属性过滤。

通过提供这些过滤机制,RocketMQ可以大大优化消息消费的准确性和有效性,使得消息系统更能贴合业务需求。不同的机制有各自的优势,选择具体的过滤机制需根据业务需求和系统性能进行合理权衡。

更新: 2024-08-18 20:23:07
原文: https://www.yuque.com/tulingzhouyu/db22bv/quouet2xt6b2k15h